12 Python面试题
1、FastAPI和Flask有什么区别?
FastAPI 和 Flask 都是 Python Web 框架,但设计理念和适用场景差别挺大。
Flask 比较老,是同步框架。写起来简单,上手快,适合小型项目或者快速原型。但它默认不支持异步,性能在高并发场景下比较一般。
FastAPI 是后来出的,基于 Starlette 和 Pydantic 构建,原生支持异步。它的几个特点让它在 Agent 项目里更受欢迎。
第一,原生异步。
FastAPI 的路由函数可以直接用 async def,配合 await 调用模型、数据库、Redis,不会阻塞。Agent 系统经常要等模型响应、等工具返回,异步比同步效率高很多。
第二,自动数据校验。
FastAPI 集成了 Pydantic,请求参数、响应体可以直接用模型定义,自动校验类型和格式。不用自己写一堆校验逻辑。
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
session_id: str
@app.post("/chat")
async def chat(request: ChatRequest):
return {"reply": "你好"}第三,自动生成 API 文档。
FastAPI 会自动根据代码生成 Swagger 文档,不用手写 OpenAPI spec。对团队协作和调试很方便。
第四,性能更好。
FastAPI 底层用了 Starlette,支持 ASGI,并发能力比 Flask 强不少。
Flask 不是不能用,只是 Agent 项目天然需要大量异步 IO,FastAPI 更合适。如果是简单的管理后台或者内部小工具,Flask 仍然可以选。
2、FastAPI为什么适合Agent项目?
FastAPI 适合 Agent 项目,主要是 Agent 系统的几个特点和 FastAPI 的能力比较匹配。
第一,Agent 系统 IO 密集。
每次请求都要调模型、查数据库、调工具、做检索,全是 IO 操作。FastAPI 原生异步,可以同时处理多个请求,不用等一个请求完全结束再处理下一个。
第二,Agent 输入输出需要结构化。
用户发过来的消息、session_id、用户身份,都要校验。FastAPI 配合 Pydantic,可以直接定义请求和响应的结构,不合法的数据自动拦截。
class AgentRequest(BaseModel):
message: str
session_id: str
user_id: str
class AgentResponse(BaseModel):
answer: str
tool_calls: list[str] = []
sources: list[str] = []第三,Agent 经常需要流式输出。
模型生成回答需要时间,用户不希望等很久才看到第一个字。FastAPI 支持 StreamingResponse,可以把模型的流式输出直接推给前端。
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: AgentRequest):
async def generate():
async for chunk in agent.astream(...):
yield f"data: {chunk}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")第四,和 LangChain / LangGraph v1.0 配合方便。
LangGraph v1.0 的 astream 接口返回异步迭代器,可以直接在 FastAPI 的路由里消费,做成 SSE 推送给前端。
第五,依赖注入好用。
FastAPI 的 Depends 机制可以把用户认证、数据库连接、Redis 客户端这些依赖注入到路由函数里,代码更干净。
3、Python异步编程如何实现?
Python 异步编程主要靠 async 和 await 关键字,配合 asyncio 事件循环。
基本写法是这样的。先定义一个协程函数:
import asyncio
async def fetch_data(url: str):
await asyncio.sleep(1) # 模拟网络请求
return {"data": "result"}async def 定义的函数不是普通函数,调用它不会执行代码,而是返回一个协程对象。要执行它,需要用 await 或者放进事件循环。
async def main():
result = await fetch_data("https://example.com")
print(result)
asyncio.run(main())await 的意思是:这个操作需要等,等的时候把控制权交出去,让事件循环去做别的事。
多个异步任务可以并发执行:
async def main():
task1 = fetch_data("url1")
task2 = fetch_data("url2")
task3 = fetch_data("url3")
results = await asyncio.gather(task1, task2, task3)三个请求同时发起,不用等一个完了再发下一个。
在 Agent 项目里,异步特别有用。比如一个请求需要同时查数据库、查 Redis、调模型,可以并发执行:
async def handle_request(user_id: str):
user_info, memory, search_results = await asyncio.gather(
get_user_info(user_id),
get_memory(user_id),
search_docs(query)
)FastAPI 里直接用 async def 定义路由就行,框架会自动处理事件循环。
4、asyncio原理是什么?
asyncio 的核心是事件循环(Event Loop)。
事件循环可以理解成一个调度器。它维护一个任务队列,不断地从队列里取出任务执行。当一个任务遇到 await,就把它挂起,去执行下一个任务。等被挂起的任务完成了,再把它放回来继续执行。
简单画一下流程:
事件循环启动
↓
取出任务A,开始执行
↓
任务A遇到 await(比如等网络响应)
↓
把任务A挂起,取出任务B执行
↓
任务B遇到 await,挂起
↓
任务A的网络响应回来了,恢复任务A继续执行
↓
...不断循环关键点在于:Python 的 asyncio 是单线程的。所有协程在同一个线程里运行,通过协作式调度轮流执行。
这和线程不一样。线程是操作系统级别的抢占式调度,多个线程可以在不同 CPU 核心上跑。asyncio 的协程都在一个线程里,靠 await 主动让出控制权。
这也是为什么异步代码里不能写阻塞操作。如果在协程里调用 time.sleep(10) 或者做 CPU 密集计算,整个事件循环都会被卡住。
遇到阻塞操作要用 run_in_executor 放到线程池里:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
async def blocking_task():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, heavy_computation)
return result在 Agent 项目里,模型调用、数据库查询、HTTP 请求这些 IO 操作用异步没问题。但如果要做大量文本处理、向量计算这种 CPU 密集的事,最好丢到线程池或进程池里。
5、协程和线程有什么区别?
协程和线程都能实现并发,但原理和适用场景不一样。
第一,调度方式不同。
线程是操作系统调度的,多个线程可以在不同 CPU 核心上跑,是真正的并行(受 GIL 限制,Python 的多线程在 CPU 密集场景下其实不是真正并行)。线程之间的切换是抢占式的,操作系统决定什么时候切换。
协程是应用层调度的,所有协程在同一个线程里,通过 await 主动让出控制权。切换时机由代码决定,不是操作系统。
第二,开销不同。
线程的创建和切换开销比较大。每个线程要分配独立的栈空间,切换时要保存和恢复寄存器状态。
协程很轻量,创建和切换的开销都很小。可以同时跑几万个协程,不会有明显问题。
第三,并发能力不同。
Python 的多线程受 GIL 限制,CPU 密集任务多线程跑不快。但 IO 密集任务(比如网络请求、文件读写)可以用多线程。
协程天然适合 IO 密集场景。Agent 系统大部分时间在等模型响应、等数据库返回、等工具执行,用协程效率很高。
第四,编程模型不同。
多线程要考虑锁、竞争条件、死锁这些问题,代码复杂度高。
协程是协作式的,同一时刻只有一个协程在执行,不需要加锁,代码写起来更简单。
Agent 项目一般推荐用协程,因为主要瓶颈在 IO。FastAPI 默认就是异步的,和协程配合很好。
6、Redis常用数据结构有哪些?
Redis 常用的数据结构有五种,在 Agent 项目里各有用途。
第一,String(字符串)。
最基础的结构,可以存文本、数字。在 Agent 里常用来存会话状态、用户偏好、缓存结果。
# 存用户偏好
redis.set("user:001:language", "中文")
# 存缓存,5分钟过期
redis.setex("cache:query:001", 300, "搜索结果")第二,Hash(哈希)。
存键值对,适合存对象。比如用户信息、任务状态。
redis.hset("user:001", mapping={
"name": "张三",
"role": "admin",
"language": "中文"
})第三,List(列表)。
有序的元素集合,可以从两端插入和弹出。适合做消息队列、最近操作记录。
# 存最近的消息
redis.lpush("session:001:messages", message_json)
redis.ltrim("session:001:messages", 0, 99) # 只保留最近100条第四,Set(集合)。
无序的、不重复的元素集合。适合做标签、权限、去重。
# 用户能用的工具
redis.sadd("user:001:tools", "search_docs", "get_order")第五,Sorted Set(有序集合)。
带分数的有序集合。适合做排行榜、优先级队列。
# 按优先级排队
redis.zadd("task_queue", {"task_001": 10, "task_002": 5})除了这五种,Redis 还有 Stream(消息流)、Bitmap、HyperLogLog 等,但在 Agent 项目里用得相对少。
Agent 系统里最常用的还是 String 和 Hash,用来做状态缓存和用户信息。List 用来存会话消息,Set 用来做权限控制。
7、Redis如何实现状态管理?
Redis 在 Agent 状态管理里主要做两件事:存会话状态和存临时数据。
第一,会话状态管理。
每个会话有一个 session_id,用 Hash 存会话信息:
import redis
import json
r = redis.Redis()
def save_session_state(session_id: str, state: dict):
key = f"session:{session_id}"
r.hset(key, mapping={
"user_id": state["user_id"],
"messages": json.dumps(state["messages"]),
"status": state["status"],
"updated_at": str(datetime.now())
})
r.expire(key, 3600) # 1小时过期
def get_session_state(session_id: str) -> dict:
key = f"session:{session_id}"
data = r.hgetall(key)
if not data:
return None
return {
"user_id": data["user_id"].decode(),
"messages": json.loads(data["messages"]),
"status": data["status"].decode()
}第二,LangGraph checkpointer 后端。
LangGraph v1.0 的 checkpointer 可以用 Redis 做持久化存储。每次 Agent 执行完一个节点,状态会保存到 Redis,下次继续对话时恢复。
相比用数据库,Redis 读写快,适合高频状态更新。但要注意设置合理的过期时间,避免会话状态堆积。
第三,任务状态追踪。
异步任务执行时,可以用 Redis 记录任务状态:
def update_task_status(task_id: str, status: str):
r.hset(f"task:{task_id}", mapping={
"status": status,
"updated_at": str(datetime.now())
})前端可以轮询这个状态,或者通过 SSE 推送变化。
第四,分布式锁。
多个 Agent 实例处理同一个用户请求时,需要加锁避免状态冲突:
def acquire_lock(session_id: str):
lock_key = f"lock:session:{session_id}"
return r.set(lock_key, "1", nx=True, ex=10) # 10秒自动释放Redis 做状态管理的优势是快、支持过期、支持分布式。缺点是数据在内存里,成本比数据库高,不适合存大量历史数据。
8、Docker镜像和容器有什么区别?
Docker 镜像和容器的关系,可以类比成类和实例。
镜像是一个只读的模板,里面包含了运行应用需要的所有东西:操作系统基础、Python 环境、依赖包、应用代码、配置文件。镜像本身不能运行,它是静态的。
容器是镜像运行起来后的实例。一个镜像可以启动多个容器,每个容器有自己独立的文件系统和网络。容器里可以写入数据,但容器删除后,写入的数据也会丢失(除非挂载了数据卷)。
举个 Agent 项目的例子。
镜像里可能包含:
Python 3.11
FastAPI
LangChain v1.0
LangGraph v1.0
Redis 客户端
应用代码
配置文件容器是从这个镜像启动的进程。可以同时启动 3 个容器,处理不同用户的请求。
Dockerfile 大致长这样:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]构建镜像:
docker build -t agent-service .启动容器:
docker run -d -p 8000:8000 --name agent-001 agent-service容器的好处是环境一致。开发、测试、生产用同一个镜像,不会出现“在我机器上能跑”的问题。
9、如何部署Agent服务?
部署 Agent 服务要考虑几个方面:服务本身、状态存储、模型访问、工具依赖、监控和扩展。
第一,服务容器化。
用 Docker 把 Agent 服务打包成镜像,确保环境一致。
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]第二,状态存储。
LangGraph v1.0 的 checkpointer 和 store 要接持久化后端,比如 PostgreSQL 或 Redis。不能只用内存版本,否则服务重启状态就丢了。
第三,多实例部署。
Agent 服务本身设计成无状态的,状态都放外部存储。这样可以横向扩展,多实例部署在 K8s 或者云服务器上,用负载均衡分发请求。
第四,模型服务。
模型 API 的 key 和配置要通过环境变量或配置中心注入,不要硬编码在代码里。
# docker-compose.yml
services:
agent:
image: agent-service:latest
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@db:5432/agent_db第五,依赖服务。
Redis、数据库、向量数据库这些依赖服务要独立部署,做好备份和高可用。
第六,监控和日志。
部署时要接入监控系统,记录请求量、延迟、错误率、Token 消耗。日志要结构化,方便查询和分析。
第七,CI/CD。
代码提交后自动跑测试、构建镜像、部署到测试环境。验证通过后再部署到生产。Agent 项目改 Prompt、改工具描述都可能影响行为,自动化测试很重要。
一个简单的部署架构:
用户请求
↓
负载均衡(Nginx / ALB)
↓
Agent 服务集群(多个 Docker 容器)
↓
├── Redis(会话状态)
├── PostgreSQL(持久化状态 + 结构化记忆)
├── 向量数据库(RAG 检索)
└── 模型 API(Claude / OpenAI)10、如何设计REST API?
Agent 项目的 REST API 设计要兼顾易用性和 Agent 的特殊需求。
第一,基本接口设计。
Agent 服务通常需要这几个核心接口:
POST /chat 发送消息,获取回答
POST /chat/stream 发送消息,流式返回
POST /sessions 创建新会话
GET /sessions/{id} 获取会话历史
DELETE /sessions/{id} 删除会话第二,请求和响应结构。
请求体要清晰:
class ChatRequest(BaseModel):
message: str
session_id: str | None = None
user_id: str
class ChatResponse(BaseModel):
answer: str
session_id: str
sources: list[str] = []
tool_calls: list[ToolCall] = []第三,流式输出。
Agent 回答可能需要几秒甚至十几秒,流式输出能改善用户体验。用 SSE(Server-Sent Events)实现:
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
config = {"configurable": {"thread_id": request.session_id}}
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": request.message}]},
config=config,
version="v2"
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"].content
yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")LangGraph v1.0 的 astream_events 接口可以拿到模型生成的每个 token、工具调用的每个事件,适合做流式推送。
第四,错误处理。
统一错误格式,不要一会儿返回字符串,一会儿返回 JSON:
class ErrorResponse(BaseModel):
error: str
code: str
details: dict = {}
@app.exception_handler(Exception)
async def global_exception(request, exc):
return JSONResponse(
status_code=500,
content={"error": "服务内部错误", "code": "INTERNAL_ERROR"}
)第五,认证和限流。
每个请求要带上 token 或 API key,服务端验证用户身份。同时做限流,防止单个用户把服务打满。
第六,版本管理。
API 路径带上版本号,比如 /api/v1/chat。后续接口变更时不会影响老客户端。